In [ ]:
import gtk
import gobject
import threading
import datetime as dt
import matplotlib as mpl
import matplotlib.style
import numpy as np
import pandas as pd
from streaming_plot import StreamingPlot
def _generate_data(stop_event, data_ready, data):
'''
Generate random data to emulate, e.g., reading data from ADC.
The function is run in its own thread.
Parameters
----------
stop_event : threading.Event
Function returns when :data:`stop_event` is set.
data_ready : threading.Event
Function sets :data:`data_ready` whenever new data is available.
data : list
The function **MUST**:
- Return when the :data:`stop_event` is set.
- Set :data:`data_ready` event whenever new data is available.
'''
delta_t = dt.timedelta(seconds=.1)
samples_per_plot = 5
while True:
time_0 = dt.datetime.now()
values_i = np.random.rand(samples_per_plot)
absolute_times_i = pd.Series([time_0 + i * delta_t
for i in xrange(len(values_i))])
data_i = pd.Series(values_i, index=absolute_times_i)
data.append(data_i)
data_ready.set()
if stop_event.wait(samples_per_plot *
delta_t.total_seconds()):
break
def measure_dialog(f_data, duration_s=None, auto_start=True,
auto_close=True):
'''
Launch a GTK dialog and plot data
Parameters
----------
f_data : function(stop_event, data_ready, data)
Function to run to generate data, e.g., read data from ADC.
The function is run in its own thread and is provided the following
parameters:
- :data:`stop_event` : threading.Event
- :data:`data_ready` : threading.Event
- :data:`data` : list
The function **MUST**:
- Return when the :data:`stop_event` is set.
- Set :data:`data_ready` event whenever new data is available.
duration_s : float, optional
Length of time to measure for (in seconds).
If duration is not specified, measure until window is closed or
``Pause`` button is pressed.
auto_start : bool, optional
Automatically start measuring when the dialog is launched.
Default is ``True``.
auto_close : bool, optional
If ``duration_s`` is specified, automatically close window once the
measurement duration has passed (unless the ``Pause`` button has been
pressed.
Default is ``True``.
'''
# `StreamingPlot` class uses threads. Need to initialize GTK to use
# threads. See [here][1] for more information.
#
# [1]: http://faq.pygtk.org/index.py?req=show&file=faq20.001.htp
gtk.gdk.threads_init()
with mpl.style.context('seaborn',
{'image.cmap': 'gray',
'image.interpolation' : 'none'}):
# Create dialog window to wrap PMT measurement view widget.
dialog = gtk.Dialog()
dialog.set_default_size(800, 600)
view = StreamingPlot(data_func=f_data)
dialog.get_content_area().pack_start(view.widget, True, True)
dialog.connect('check-resize', lambda *args: view.on_resize())
dialog.set_position(gtk.WIN_POS_MOUSE)
dialog.show_all()
view.fig.tight_layout()
if auto_start:
gobject.idle_add(view.start)
def _auto_close(*args):
if not view.stop_event.is_set():
# User did not explicitly pause the measurement. Automatically
# close the measurement and continue.
dialog.destroy()
if duration_s is not None:
stop_func = _auto_close if auto_close else view.pause
gobject.timeout_add(duration_s, stop_func)
measurement_complete = threading.Event()
view.widget.connect('destroy', lambda *args: measurement_complete.set())
dialog.run()
dialog.destroy()
measurement_complete.wait()
if view.data:
return pd.concat(view.data)
else:
return None
data = measure_dialog(_generate_data, duration_s=5000, auto_close=True)
In [ ]:
view = StreamingPlot(data_func=_generate_data)
In [ ]:
from IPython.display import display
import bz2
data = pd.concat(view.data)
data_json = data.to_json()
data_json_bz2 = bz2.compress(data_json)
data_from_json = pd.read_json(bz2.decompress(data_json_bz2), typ='series')
len(data_json), len(data_json_bz2)
In [ ]:
import gobject
import gtk
import mr_box_peripheral_board
import mr_box_peripheral_board.ui.gtk.pump_ui as pump_ui
reload(pump_ui)
# `PumpControl` class uses threads. Need to initialize GTK to use threads.
# See [here][1] for more information.
#
# [1]: http://faq.pygtk.org/index.py?req=show&file=faq20.001.htp
gtk.gdk.threads_init()
# Create pump control view widget.
pump_control_view = pump_ui.PumpControl(None, frequency_hz=100, duration_s=10)
# Start pump automatically.
gobject.idle_add(pump_control_view.start)
# Create dialog window to wrap pump control view widget.
dialog = gtk.Dialog()
dialog.get_content_area().pack_start(pump_control_view.widget, True, True)
dialog.set_position(gtk.WIN_POS_MOUSE)
dialog.show_all()
dialog.run()
dialog.destroy()